[MIG] base_geoengine: Migration to 19.0#4
[MIG] base_geoengine: Migration to 19.0#4weinni2000 wants to merge 3 commits into19.0.integrationfrom
Conversation
Reviewer's GuideMigration of base_geoengine to Odoo 19.0 by refactoring geospatial SQL generation to leverage the new Domain API, introducing a dedicated domains.py with enhanced DomainCondition methods, and updating code conventions for CR access and translations. Class diagram for enhanced DomainCondition geospatial supportclassDiagram
class DomainCondition {
+field_expr: str
+operator: str
+value: any
+_opt_level: OptimizationLevel
+checked()
+_to_sql(model: BaseModel, alias: str, query: Query): SQL
+_optimize_step(model: BaseModel, level: OptimizationLevel): Domain
}
class BaseModel {
+_fields: dict
+_check_field_access(field, mode)
}
class Domain {
+AND(domains)
+optimize_full(model)
}
class Query {
}
class SQL {
}
DomainCondition --> BaseModel : uses for _to_sql
DomainCondition --> Domain : returns Domain in _optimize_step
DomainCondition --> Query : uses for _to_sql
DomainCondition --> SQL : returns SQL in _to_sql
BaseModel --> Query : uses for SQL generation
Domain --> DomainCondition : contains
Class diagram for GeoField and related database update methodsclassDiagram
class GeoField {
+entry_to_shape(value, same_type=False)
+update_geo_db_column(model)
+update_db_column(model, column)
+srid: int
+geo_type: str
+dim: int
+gist_index: bool
+name: str
+string: str
+column_type: tuple
+column_cast_from: list
}
class Model {
+env: Env
+_table: str
}
class Env {
+cr: Cursor
+_(msg, ...): str
}
class Cursor {
+fetchone()
}
GeoField --> Model : parameter for update methods
Model --> Env : has
Env --> Cursor : has
GeoField --> Env : uses for translation and db access
GeoField --> Cursor : uses for db operations
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey there - I've reviewed your changes - here's some feedback:
- The new domains.py module isn’t imported anywhere (e.g. in init.py), so its monkey-patches won’t be applied at runtime—please add an import to ensure your geo domain logic is loaded.
- Directly monkey-patching private methods like _condition_to_sql and _optimize_step is brittle across Odoo upgrades—consider leveraging the public Domain API extension points or a dedicated hook for geo operators to improve maintainability.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The new domains.py module isn’t imported anywhere (e.g. in __init__.py), so its monkey-patches won’t be applied at runtime—please add an import to ensure your geo domain logic is loaded.
- Directly monkey-patching private methods like _condition_to_sql and _optimize_step is brittle across Odoo upgrades—consider leveraging the public Domain API extension points or a dedicated hook for geo operators to improve maintainability.
## Individual Comments
### Comment 1
<location> `base_geoengine/domains.py:122-131` </location>
<code_context>
+def _optimize_step(self, model: BaseModel, level: OptimizationLevel) -> Domain:
</code_context>
<issue_to_address>
**suggestion:** Custom _optimize_step for geospatial operators may not cover all edge cases.
Please enhance validation and error handling to ensure all field types and invalid configurations are properly managed.
</issue_to_address>
### Comment 2
<location> `base_geoengine/expressions.py:41` </location>
<code_context>
-expression.SQL_OPERATORS.update(GEO_SQL_OPERATORS)
def _condition_to_sql(
- self, alias: str, fname: str, operator: str, value, query: Query
+ self,
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring _condition_to_sql by delegating geo-operator logic to helpers and using SQL templates instead of manual string manipulation.
Here are a few targeted refactorings to flatten `_condition_to_sql`, remove raw mogrify/replace, and keep all functionality:
1) Delegate all the geo‐dict logic to a private helper, returning a single `SQL`
2) Replace manual mogrify + string‐replace with a proper `EXISTS(%s)` SQL template
3) Only patch the geo‐operators path, otherwise call `super()`
```python
# 1) Top‐level monkey‐patch keeps only geo‐operator check
def _condition_to_sql(self, field_expr, operator, value, model, alias, query):
if operator.startswith("geo_"):
return self._geo_to_sql(field_expr, operator, value, model, alias, query)
return super(fields.Field, self)._condition_to_sql(
field_expr=field_expr, operator=operator, value=value,
model=model, alias=alias, query=query,
)
# 2) Extracted helper for all geo operators
def _geo_to_sql(self, field_expr, operator, value, model, alias, query):
field = model._fields[field_expr]
assert isinstance(field, GeoField), "only on geo fields"
geo = GeoOperator(field)
params = []
if isinstance(value, dict):
return self._geo_dict_to_sql(field_expr, operator, value, model, alias)
# simple geo_ operator
sql_code = getattr(geo, f"get_{operator}_sql")(model._table, field_expr, value, params)
return SQL(sql_code, *params)
# 3) Helper for the dict‐of‐related branch
def _geo_dict_to_sql(self, field_expr, operator, ref_search, model, alias):
exists_clauses = []
for rel_name, domain in ref_search.items():
rel_model = model.env[rel_name]
rel_alias = f"{rel_model._table}_{uuid4().hex[:5]}"
rel_query = where_calc(rel_model, domain, active_test=True, alias=rel_alias)
model._check_field_access(model._fields[field_expr], "read")
# add the spatial predicate
if operator in ("geo_greater", "geo_lesser"):
rel_query.add_where(
SQL("ST_Area(%s.%s) %s ST_Area(%s.%s)",
alias, field_expr,
GEO_SQL_OPERATORS[operator],
rel_alias, rel_name.split(".")[-1],
)
)
else:
rel_query.add_where(
SQL("%s(%s.%s, %s.%s)",
GEO_SQL_OPERATORS[operator],
alias, field_expr,
rel_alias, rel_name.split(".")[-1],
)
)
sub = rel_query.subselect("1")
# use the SQL object directly instead of .mogrify + replace
exists_clauses.append(SQL("EXISTS(%s)", sub))
return SQL(" AND ").join(exists_clauses)
```
• Now the main patch is only ~20 lines
• We never manually manipulate the raw query string, rely on `SQL()` and its params
• All geo‐dict logic is isolated in `_geo_dict_to_sql`
• The default path simply calls the original super—no duplication of Odoo’s core logic.
</issue_to_address>
### Comment 3
<location> `base_geoengine/domains.py:33` </location>
<code_context>
+)
+
+
+def checked(self) -> DomainCondition:
+ """Validate `self` and return it if correct, otherwise raise an exception."""
+ if not isinstance(self.field_expr, str) or not self.field_expr:
</code_context>
<issue_to_address>
**issue (complexity):** Consider wrapping the original methods and only handling GEO_OPERATORS as a special case, falling back to the originals for all other logic.
```markdown
You don’t need to re-implement the entire `checked`/`_to_sql`/`_optimize_step` logic—just wrap the original methods and handle your GEO_OPERATORS case first, falling back immediately to the saved originals. This cuts out all the copied boilerplate and keeps behavior unchanged.
```python
# save originals
_orig_checked = DomainCondition.checked
_orig_to_sql = DomainCondition._to_sql
_orig_optimize = DomainCondition._optimize_step
GEO_OPERATORS = frozenset({
"geo_greater", "geo_lesser", "geo_equal", "geo_touch",
"geo_within", "geo_contains", "geo_intersect",
})
def checked(self) -> DomainCondition:
op = self.operator.lower()
if op in GEO_OPERATORS:
# (optional) warn if upper-case
if op != self.operator:
warnings.warn(
f"geo operator should be lower-case: {(self.field_expr, self.operator)!r}",
DeprecationWarning, stacklevel=2
)
return DomainCondition(self.field_expr, op, self.value).checked()
return self
return _orig_checked(self)
def _to_sql(self, model, alias, query) -> SQL:
if self.operator in GEO_OPERATORS:
assert self._opt_level >= OptimizationLevel.FULL, "Must optimize GEO first"
fld = self._field(model)
model._check_field_access(fld, "read")
return fld.condition_to_sql(
self.field_expr, self.operator, self.value, model, alias, query
)
return _orig_to_sql(self, model, alias, query)
def _optimize_step(self, model, level) -> Domain:
if self.operator in GEO_OPERATORS:
# bump level to FULL
optimized = DomainCondition(self.field_expr, self.operator, self.value)
object.__setattr__(optimized, "_opt_level", OptimizationLevel.FULL)
return optimized
return _orig_optimize(self, model, level)
# patch
DomainCondition.checked = checked
DomainCondition._to_sql = _to_sql
DomainCondition._optimize_step = _optimize_step
# expand the global set so `checked` sees them as valid
domains.CONDITION_OPERATORS |= GEO_OPERATORS
```
Benefits:
- no full copy of Odoo’s original logic
- clear “if GEO” / “else original” structure
- all existing behavior (and warnings) are preserved
- greatly reduced maintenance surface
```
</issue_to_address>
### Comment 4
<location> `base_geoengine/domains.py:51-55` </location>
<code_context>
if operator not in CONDITION_OPERATORS:
# <MOD>
if operator not in GEO_OPERATORS:
# </MOD>
self._raise("Invalid operator")
</code_context>
<issue_to_address>
**suggestion (code-quality):** Merge nested if conditions ([`merge-nested-ifs`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/merge-nested-ifs))
```suggestion
if operator not in CONDITION_OPERATORS and operator not in GEO_OPERATORS:
self._raise("Invalid operator")
```
<br/><details><summary>Explanation</summary>Too much nesting can make code difficult to understand, and this is especially
true in Python, where there are no brackets to help out with the delineation of
different nesting levels.
Reading deeply nested code is confusing, since you have to keep track of which
conditions relate to which levels. We therefore strive to reduce nesting where
possible, and the situation where two `if` conditions can be combined using
`and` is an easy win.
</details>
</issue_to_address>
### Comment 5
<location> `base_geoengine/expressions.py:41` </location>
<code_context>
def _condition_to_sql(
self,
field_expr: str,
operator: str,
value,
model: BaseModel,
alias: str,
query: Query,
) -> SQL:
"""
This method has been monkey patched in order to be able to include
geo_operators into the Odoo search method.
"""
if operator in GEO_OPERATORS.keys():
current_field = model._fields.get(field_expr)
current_operator = GeoOperator(current_field)
if current_field and isinstance(current_field, GeoField):
params = []
if isinstance(value, dict):
# We are having indirect geo_operator like (‘geom’, ‘geo_...’,
# {‘res.zip.poly’: [‘id’, ‘in’, [1,2,3]] })
ref_search = value
sub_queries = []
for key in ref_search:
i = key.rfind(".")
rel_model_name = key[0:i]
rel_col = key[i + 1 :]
rel_model = model.env[rel_model_name]
# we compute the attributes search on spatial rel
if ref_search[key]:
rel_alias = (
rel_model._table
+ "_"
+ "".join(random.choices(string.ascii_lowercase, k=5))
)
rel_query = where_calc(
rel_model,
ref_search[key],
active_test=True,
alias=rel_alias,
)
model._check_field_access(current_field, "read")
if operator == "geo_equal":
rel_query.add_where(
f'"{alias}"."{field_expr}" {GEO_OPERATORS[operator]} '
f"{rel_alias}.{rel_col}"
)
elif operator in ("geo_greater", "geo_lesser"):
rel_query.add_where(
f"ST_Area({alias}.{field_expr}) "
f"{GEO_OPERATORS[operator]} "
f"ST_Area({rel_alias}.{rel_col})"
)
else:
rel_query.add_where(
f'{GEO_OPERATORS[operator]}("{alias}"."{field_expr}", '
f"{rel_alias}.{rel_col})"
)
subquery_sql = rel_query.subselect("1")
sub_query_mogrified = (
model.env.cr.mogrify(subquery_sql.code, subquery_sql.params)
.decode("utf-8")
.replace(f"'{rel_model._table}'", f'"{rel_model._table}"')
.replace("%", "%%")
)
sub_queries.append(f"EXISTS({sub_query_mogrified})")
query_str = " AND ".join(sub_queries)
else:
query_str = get_geo_func(
current_operator, operator, field_expr, value, params, model._table
)
return SQL(query_str, *params)
return original___condition_to_sql(
self,
field_expr=field_expr,
operator=operator,
value=value,
model=model,
alias=alias,
query=query,
)
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Replace a[0:x] with a[:x] and a[x:len(a)] with a[x:] ([`remove-redundant-slice-index`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-redundant-slice-index/))
- Use set when checking membership of a collection of literals ([`collection-into-set`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/collection-into-set/))
- Low code quality found in \_condition\_to\_sql - 16% ([`low-code-quality`](https://docs.sourcery.ai/Reference/Default-Rules/comments/low-code-quality/))
<br/><details><summary>Explanation</summary>
The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.
How can you solve this?
It might be worth refactoring this function to make it shorter and more readable.
- Reduce the function length by extracting pieces of functionality out into
their own functions. This is the most important thing you can do - ideally a
function should be less than 10 lines.
- Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
sits together within the function rather than being scattered.</details>
</issue_to_address>
### Comment 6
<location> `base_geoengine/fields.py:210-219` </location>
<code_context>
def update_db_column(self, model, column):
"""Create/update the column corresponding to ``self``.
For creation of geo column
:param model: an instance of the field's model
:param column: the column's configuration (dict)
if it exists, or ``None``
"""
# the column does not exist, create it
if not column:
create_geo_column(
model.env.cr,
model._table,
self.name,
self.geo_type.upper(),
self.srid,
self.dim,
self.string,
)
if self.gist_index:
create_geo_index(model.env.cr, self.name, model._table)
return
if column["udt_name"] == self.column_type[0]:
if self.gist_index:
create_geo_index(model.env.cr, self.name, model._table)
return
self.update_geo_db_column(model)
if column["udt_name"] in self.column_cast_from:
sql.convert_column(
model.env.cr, model._table, self.name, self.column_type[1]
)
else:
newname = (self.name + "_moved{}").format
i = 0
while sql.column_exists(model.env.cr, model._table, newname(i)):
i += 1
if column["is_nullable"] == "NO":
sql.drop_not_null(model.env.cr, model._table, self.name)
sql.rename_column(model.env.cr, model._table, self.name, newname(i))
sql.create_column(
model.env.cr, model._table, self.name, self.column_type[1], self.string
)
</code_context>
<issue_to_address>
**issue (code-quality):** Extract code out into method ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
</issue_to_address>
### Comment 7
<location> `base_geoengine/models/geo_vector_layer.py:100` </location>
<code_context>
@api.constrains("geo_field_id", "model_id")
def _check_geo_field_id(self):
for rec in self:
if rec.model_id:
if not rec.geo_field_id.model_id == rec.model_id:
raise ValidationError(
self.env._(
"The geo_field_id must be a field in %s model",
rec.model_id.display_name,
)
)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Simplify logical expression using De Morgan identities ([`de-morgan`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/de-morgan/))
```suggestion
if rec.geo_field_id.model_id != rec.model_id:
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| def _optimize_step(self, model: BaseModel, level: OptimizationLevel) -> Domain: | ||
| """Optimization step for geospatial operators.""" | ||
| # For geospatial operators, we need to handle them specially during optimization | ||
| # If this is a geospatial operator, mark it as optimized at FULL level | ||
| if self.operator in GEO_OPERATORS: | ||
| # Perform basic validation and normalization | ||
| with contextlib.suppress(Exception): | ||
| field = self._field(model) | ||
| # Basic geospatial operator validation | ||
| if hasattr(field, "geo_type"): # It's a geospatial field |
There was a problem hiding this comment.
suggestion: Custom _optimize_step for geospatial operators may not cover all edge cases.
Please enhance validation and error handling to ensure all field types and invalid configurations are properly managed.
| expression.SQL_OPERATORS.update(GEO_SQL_OPERATORS) | ||
|
|
||
|
|
||
| def _condition_to_sql( |
There was a problem hiding this comment.
issue (complexity): Consider refactoring _condition_to_sql by delegating geo-operator logic to helpers and using SQL templates instead of manual string manipulation.
Here are a few targeted refactorings to flatten _condition_to_sql, remove raw mogrify/replace, and keep all functionality:
- Delegate all the geo‐dict logic to a private helper, returning a single
SQL - Replace manual mogrify + string‐replace with a proper
EXISTS(%s)SQL template - Only patch the geo‐operators path, otherwise call
super()
# 1) Top‐level monkey‐patch keeps only geo‐operator check
def _condition_to_sql(self, field_expr, operator, value, model, alias, query):
if operator.startswith("geo_"):
return self._geo_to_sql(field_expr, operator, value, model, alias, query)
return super(fields.Field, self)._condition_to_sql(
field_expr=field_expr, operator=operator, value=value,
model=model, alias=alias, query=query,
)
# 2) Extracted helper for all geo operators
def _geo_to_sql(self, field_expr, operator, value, model, alias, query):
field = model._fields[field_expr]
assert isinstance(field, GeoField), "only on geo fields"
geo = GeoOperator(field)
params = []
if isinstance(value, dict):
return self._geo_dict_to_sql(field_expr, operator, value, model, alias)
# simple geo_ operator
sql_code = getattr(geo, f"get_{operator}_sql")(model._table, field_expr, value, params)
return SQL(sql_code, *params)
# 3) Helper for the dict‐of‐related branch
def _geo_dict_to_sql(self, field_expr, operator, ref_search, model, alias):
exists_clauses = []
for rel_name, domain in ref_search.items():
rel_model = model.env[rel_name]
rel_alias = f"{rel_model._table}_{uuid4().hex[:5]}"
rel_query = where_calc(rel_model, domain, active_test=True, alias=rel_alias)
model._check_field_access(model._fields[field_expr], "read")
# add the spatial predicate
if operator in ("geo_greater", "geo_lesser"):
rel_query.add_where(
SQL("ST_Area(%s.%s) %s ST_Area(%s.%s)",
alias, field_expr,
GEO_SQL_OPERATORS[operator],
rel_alias, rel_name.split(".")[-1],
)
)
else:
rel_query.add_where(
SQL("%s(%s.%s, %s.%s)",
GEO_SQL_OPERATORS[operator],
alias, field_expr,
rel_alias, rel_name.split(".")[-1],
)
)
sub = rel_query.subselect("1")
# use the SQL object directly instead of .mogrify + replace
exists_clauses.append(SQL("EXISTS(%s)", sub))
return SQL(" AND ").join(exists_clauses)• Now the main patch is only ~20 lines
• We never manually manipulate the raw query string, rely on SQL() and its params
• All geo‐dict logic is isolated in _geo_dict_to_sql
• The default path simply calls the original super—no duplication of Odoo’s core logic.
| ) | ||
|
|
||
|
|
||
| def checked(self) -> DomainCondition: |
There was a problem hiding this comment.
issue (complexity): Consider wrapping the original methods and only handling GEO_OPERATORS as a special case, falling back to the originals for all other logic.
You don’t need to re-implement the entire `checked`/`_to_sql`/`_optimize_step` logic—just wrap the original methods and handle your GEO_OPERATORS case first, falling back immediately to the saved originals. This cuts out all the copied boilerplate and keeps behavior unchanged.
```python
# save originals
_orig_checked = DomainCondition.checked
_orig_to_sql = DomainCondition._to_sql
_orig_optimize = DomainCondition._optimize_step
GEO_OPERATORS = frozenset({
"geo_greater", "geo_lesser", "geo_equal", "geo_touch",
"geo_within", "geo_contains", "geo_intersect",
})
def checked(self) -> DomainCondition:
op = self.operator.lower()
if op in GEO_OPERATORS:
# (optional) warn if upper-case
if op != self.operator:
warnings.warn(
f"geo operator should be lower-case: {(self.field_expr, self.operator)!r}",
DeprecationWarning, stacklevel=2
)
return DomainCondition(self.field_expr, op, self.value).checked()
return self
return _orig_checked(self)
def _to_sql(self, model, alias, query) -> SQL:
if self.operator in GEO_OPERATORS:
assert self._opt_level >= OptimizationLevel.FULL, "Must optimize GEO first"
fld = self._field(model)
model._check_field_access(fld, "read")
return fld.condition_to_sql(
self.field_expr, self.operator, self.value, model, alias, query
)
return _orig_to_sql(self, model, alias, query)
def _optimize_step(self, model, level) -> Domain:
if self.operator in GEO_OPERATORS:
# bump level to FULL
optimized = DomainCondition(self.field_expr, self.operator, self.value)
object.__setattr__(optimized, "_opt_level", OptimizationLevel.FULL)
return optimized
return _orig_optimize(self, model, level)
# patch
DomainCondition.checked = checked
DomainCondition._to_sql = _to_sql
DomainCondition._optimize_step = _optimize_step
# expand the global set so `checked` sees them as valid
domains.CONDITION_OPERATORS |= GEO_OPERATORSBenefits:
- no full copy of Odoo’s original logic
- clear “if GEO” / “else original” structure
- all existing behavior (and warnings) are preserved
- greatly reduced maintenance surface
5c4fead to
07b65c6
Compare
Delta for Migration
Summary by Sourcery
Migrate base_geoengine to Odoo 19.0 by updating core APIs, integrating geospatial domain handling into the new DomainCondition pipeline, and refreshing module metadata and docs.
Enhancements: